AWS Step FunctionsでLambda関数の結果をS3にPUTするステートマシンを作ってみた
こんにちは。サービス開発室の武田です。
何か処理をして、その結果をS3バケットにアップロードするという操作は「よくあるパターン」のひとつでしょう。AWS Step FunctionsにはAWS SDKサービス統合が用意されており、コードを書くことなくAWSサービスと連携できます。
今回はLambda呼び出しのタスクでJSONデータを出力し、S3へPUTするタスクでそのデータをアップロードする一連のステートマシンを試してみました。もちろんLambda関数の中でS3へアップロードもできますが、データ作成とアップロードを別のタスクに分割することが今回の目的です。
ただしStep Functionsの制限から、データサイズが 256KB を超えるケースではこの方法は使えません。その場合はLambda関数の中でS3へアップロードする処理も実装してください。
CDKの準備
動作確認するにあたって準備したCDKのソースは次のものです。
import * as cdk from "aws-cdk-lib"; import { Construct } from "constructs"; import { aws_s3 as s3, aws_stepfunctions as sfn, aws_stepfunctions_tasks as sfn_tasks, aws_lambda as lambda, } from "aws-cdk-lib"; export class ExampleSfnS3UploadLambdaResultStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const s3bucket = new s3.Bucket(this, "S3UploadBucket", { enforceSSL: true, removalPolicy: cdk.RemovalPolicy.DESTROY, autoDeleteObjects: true, }); const hello_function = new lambda.Function(this, "HelloFunction", { code: lambda.Code.fromInline(`def handler(event, context): return { "message": "Hello, World" }`), runtime: lambda.Runtime.PYTHON_3_12, handler: "index.handler", }); const s3_upload_state_machine = new sfn.StateMachine( this, "S3UploadStateMachine", { definitionBody: sfn.DefinitionBody.fromChainable( new sfn_tasks.LambdaInvoke(this, "HelloFunction Task", { lambdaFunction: hello_function, outputPath: "$.Payload", }).next( new sfn_tasks.CallAwsService(this, "S3 PutObject Task", { service: "s3", action: "putObject", parameters: { "Body.$": "$", Bucket: s3bucket.bucketName, "Key.$": "States.Format('{}.json', $$.Execution.StartTime)", }, iamResources: [`${s3bucket.bucketArn}/*`], }) ) ), } ); } }
HelloFunction Task
がデータを生成するLambda関数を呼び出すタスクです。Lambda関数の出力を次のタスクに引き渡します。続いて呼び出されるS3 PutObject Task
が、そのデータをS3バケットへアップロードするタスクです。CDKでAWSサービス統合を利用する場合はCallAwsService
メソッドを使用し、各パラメーターを設定するだけです。
ポイントとしては、今回タスクを分割することで、HelloFunction Task
から呼び出されるLambda関数にS3バケットへアクセスする権限の追加が不要な点です。
デプロイすると、次のようなシンプルなステートマシンが作成されます。
動作確認
ステートマシンを実行しましょう。制限などされていなければ成功するはずです。
実行できたらS3バケットを確認します。想定どおりオブジェクトが追加されていればOKです。
中身はLambda関数が出力した内容そのままとなっています。
まとめ
タスクを分割することで、データ生成部分がLambda関数以外となっても少ない修正で済みます。またS3へアップロードする部分も自分でLambda関数を用意しなければいけないのでは手間ですが、サービス統合で済ませられるのは便利ですね。